package com.zzjson;

import javax.annotation.PreDestroy;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author 霄池
 * @Date 2023/11/8 10:53
 */
public class DataFetch {

    private final ExecutorService producerExecutor = Executors.newSingleThreadExecutor();

    private final BlockingQueue<String> dataBlockingQueue = new LinkedBlockingQueue<>(20000);

    private final BlockingQueue<Runnable> consumerThreadQueue = new ArrayBlockingQueue<>(200);
    private static final int MAX_THREADS = Runtime.getRuntime().availableProcessors() + 1;
    private final ExecutorService consumerExecutor = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),
        MAX_THREADS,
        4,
        TimeUnit.SECONDS,
        consumerThreadQueue,
        newThreadFactory("消费线程", Boolean.FALSE),
        new ThreadPoolExecutor.AbortPolicy());

    private volatile boolean readComplete = false;

    public static void main(String[] args) {
        DataFetch dataFetch = new DataFetch();
        dataFetch.process();
    }

    public void process() {
        //1. 数据生产
        produceData();

        //2. 数据消费
        consumeData();

        //3. 销毁资源
        //shutdown();
    }

    @PreDestroy
    private void shutdown() {
        producerExecutor.shutdown();
        consumerExecutor.shutdown();
        try {
            if (!producerExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
                producerExecutor.shutdownNow();
            }
            if (!consumerExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
                consumerExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            producerExecutor.shutdownNow();
            consumerExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private void consumeData() {
        for (int i = 0; i < MAX_THREADS; i++) {
            consumerExecutor.execute(() -> {
                try {
                    while (!readComplete || !dataBlockingQueue.isEmpty()) {
                        String data = dataBlockingQueue.take();
                        doConsumer(data);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
    }

    private void doConsumer(String data) {
        // 数据处理逻辑
        System.out.println("消费数据：" + data);
    }

    public void produceData() {
        producerExecutor.execute(() -> {
            try {
                for (int i = 0; i < 1000000000; i++) {
                    String data = "数据" + i;
                    System.out.println("生成数据" + data);
                    // fixme 通过数据特性进行路由，这里可以按照对应的数据特性路由到不同的队列
                    dataBlockingQueue.put(data);
                }
                // 数据读取完毕
                readComplete = true;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) {
        return newGenericThreadFactory(processName, isDaemon);
    }

    public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
        return new ThreadFactory() {
            private final AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
                thread.setDaemon(isDaemon);
                return thread;
            }
        };
    }

}
